package com.shujia.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo19PageRank {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("PageRank").setMaster("local")

    val sc = new SparkContext(conf)

    //读取数据
    val pageRank: RDD[String] = sc.textFile("data/pagerank.txt")

    var pageRDD: RDD[(String, List[String], Double)] = pageRank.map(line => {
      val split: Array[String] = line.split(" ")

      //当前网页
      val page: String = split(0)

      //出链列表
      val list: List[String] = split(1).split(",").toList

      (page, list, 1.0)
    })
    //网页编号和出链列表
    val pageListRDD: RDD[(String, List[String])] = pageRDD.map(kv =>(kv._1,kv._2))

    //阻尼系数
    val Q = 0.85

    //网页的数量
    val N: Long = pageRDD.count()

    var flag = true
    while (flag) {
      //将网页的pr值平分给出链列表
      val avgPageRDD: RDD[(String, Double)] = pageRDD.flatMap {
        case (_: String, list: List[String], pr: Double) =>
          val avgPr: Double = pr / list.size
          list.map(p => (p, avgPr))
      }
      //计算新的pr值
      val nowPrRDD: RDD[(String, Double)] = avgPageRDD
        .reduceByKey(_ + _)
        .mapValues(pr => (1 - Q) / N + Q * pr) //增加阻尼系数

      //将每个网页的出链列表关联回去
      val joinRDD: RDD[(String, (Double, List[String]))] = nowPrRDD.join(pageListRDD)

      //整理数据
      val resultRDD: RDD[(String, List[String], Double)] = joinRDD.map {
        case (page: String, (pr: Double, list: List[String])) =>
          (page, list, pr)
      }

      /**
        * 计算每个网页新的pr值，差值的平均值
        */
      //上一次的网页的pr值
      val lastRDD: RDD[(String, Double)] = pageRDD.map(kv => (kv._1, kv._3))

      //计算网页的pr 差值
      val prJoinRDD: RDD[(String, (Double, Double))] = nowPrRDD.join(lastRDD)

      val prChaRDD: RDD[Double] = prJoinRDD.map {
        case (_: String, (noePR: Double, lastPr: Double)) =>
          math.abs(lastPr - noePR)
      }
      //计算平均值和差值
      val avgPr: Double = prChaRDD.sum() / prChaRDD.count()

      println("差值平均值:" + avgPr)

      //如果差值平均值小于阀值，跳出循环
      if (avgPr < 0.001) {
        flag = false
      }

      //第二次计算使用第一次的结果
      pageRDD = resultRDD
    }
    pageRDD.foreach(println)
  }
}
